package com.ruoyi.system.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import com.ruoyi.common.core.utils.Helper;
import com.ruoyi.common.security.service.TokenService;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.system.api.model.LoginUser;
import com.ruoyi.system.domain.LiveMessageCode;
import com.ruoyi.system.domain.vo.LiveChatMessageVO;
import com.ruoyi.system.domain.vo.LiveUserVO;
import org.apache.tomcat.util.http.ResponseUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
 * @author hanjinqun
 * @date 2022/10/24
 * websocket操作类
 */
@Component
@ServerEndpoint(value = "/websocket/{liveRoom}", configurator = WebSocketConfig.class)
public class WebSocketServer {

    @Resource
    TokenService tokenService;
    /**
     * 日志工具
     */
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 与某个客户端的连接会话，需要通过它来给客户端发送数据
     */
    private Session session;
    /**
     * 用户id
     */
    private String userId;

    private String liveRoom;
    /**
     * 用来存放每个客户端对应的MyWebSocket对象
     */
    private final static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
    /**
     * 用来存在线连接用户信息
     */
    private final static ConcurrentHashMap<String, LiveUserVO> sessionPool = new ConcurrentHashMap<>();


    /**
     * 第一个map的key存放的为房间号
     * 第二个map的key存放的是身份，value存放的是用户信息
     */
    private final static ConcurrentHashMap<String, HashMap<Integer, List<LiveUserVO>>> livePool = new ConcurrentHashMap<>();



    /**
     * 链接成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "liveRoom") String liveRoom) {
        try {
            logger.info("websokcet.......进来了OnOpen");
            this.session = session;
            Random random = new Random();
            String userId = liveRoom + "-" + random.nextInt(10000);
            while (sessionPool.containsKey(userId)) {
                userId = liveRoom + "-" + random.nextInt(10000); // 存在 重新生成一个新的
            }
            this.userId = userId;
            this.liveRoom = liveRoom;

            LiveChatMessageVO mess = new LiveChatMessageVO();
            HashMap<Integer, List<LiveUserVO>> map = livePool.get(liveRoom) != null ? livePool.get(liveRoom) : new HashMap<>();
            List<LiveUserVO> userVOS = map.get(0) != null ? map.get(0) : new ArrayList<>();
            // 新增新身份到map中
            LiveUserVO liveUserVO = new LiveUserVO(userId, 0, session);
            liveUserVO.setConnectTime(LocalDateTime.now()); // 连接时间
            userVOS.add(liveUserVO);
            map.put(0, userVOS);
            livePool.put(liveRoom, map);
            sessionPool.put(userId, liveUserVO); // 可根据通信userId获取身份信息

            mess.setCode(LiveMessageCode.CONNECT_SUCCESS); // 连接成功消息
            mess.setMessage(userId);
            session.getAsyncRemote().sendText(JSON.toJSONString(mess)); // 发送回调消息给web端
        } catch (Exception e) {
        }
    }

    /**
     * 链接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        try {
            LiveUserVO userVO = sessionPool.get(this.userId);

            HashMap<Integer, List<LiveUserVO>> map = livePool.get(this.liveRoom);
            List<LiveUserVO> userVOS = map.get(userVO.getIdentity());
            userVOS.remove(userVO); // 删除当前身份
            webSockets.remove(this);
            sessionPool.remove(this.userId);
            logger.info("【websocket消息】连接断开，总数为:" + webSockets.size());
        } catch (Exception e) {
            logger.error(e.toString());
        }
    }

    /**
     * 发送系统消息
     *
     * @param roomId
     * @param message
     */
    public void systemSendRoomMessage(String roomId, String message) {
        HashMap<Integer, List<LiveUserVO>> map = livePool.get(roomId);
        if (map == null) return; // 直播间无人

        // 发送给直播间的所有人
        for (Integer type : map.keySet()) {
            for (LiveUserVO user : map.get(type)) {
                // 给开启会话的用户发送消息
                if (user.getSession().isOpen()) {
                    user.getSession().getAsyncRemote().sendText(message); // 发送转换为json的消息
                }
            }
        }
    }

    /**
     * 发送直播间消息
     *
     * @param liveRoomId 直播间id
     * @param code       前端交互的状态码
     * @param message    发送的消息
     */
    public void sendMessageByCode(String liveRoomId, Integer code, String message) {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", code);
        map.put("message", message);
        systemSendRoomMessage(liveRoomId, JSON.toJSONString(map));
    }
}
